package cn.xshi.oauth.client.publisher.handler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCountUtil;
import cn.xshi.common.util.JsonUtil;
import cn.xshi.common.util.SpringUtils;
import cn.xshi.oauth.client.constant.Constant;
import cn.xshi.oauth.client.publisher.NettyClient;
import cn.xshi.oauth.client.publisher.NettyClientUtil;
import cn.xshi.oauth.client.util.TokenAttributesUtil;
import cn.xshi.oauth.client.vo.RequestInfo;
import cn.xshi.oauth.client.vo.Transfer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
 * @Desc
 * @Author 邓纯杰
 * @CreateTime 2012-12-12 12:12:12
 */
@Slf4j
@Component
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private NettyClient nettyClient;

    private String clientId;//客户端id（每个服务对应一个客户端唯一id）

    private String clientGroupId;//组Id（可以存多个服务共享一个组Id）

    public NettyClientHandler(){

    }

    public NettyClientHandler(NettyClient nettyClient, String clientGroupId, String clientId){
        this.nettyClient = nettyClient;
        this.clientGroupId = clientGroupId;
        this.clientId = clientId;
    }


    /**
     * 连接服务器成功 发送握手信息
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        RequestInfo requestInfo = new RequestInfo();
        requestInfo.setClientGroupId(clientGroupId);
        requestInfo.setClientId(clientId);
        requestInfo.setShakeHands(true);
        requestInfo.setMessage("连接Oauth Server服务端成功");
        NettyClientUtil nettyClientUtil = new NettyClientUtil();
        nettyClientUtil.sendMessage(ctx.channel(),requestInfo);
        log.info("连接Oauth Server服务端成功 .....");
    }

    /**
     * 接收服务端信息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String message = msg.toString();
            executeTask(message);
        }catch (Exception e){
            log.info("读取授权中心发送的消息异常：{}",e);
        }finally {
            ReferenceCountUtil.release(msg);//释放缓存区域，防止内存泄漏
        }
    }

    /**
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 服务端挂了 调用重连机制
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        int port = ipSocket.getPort();
        String host = ipSocket.getHostString();
        log.info("---------服务端断开连接-----------" + port);
        final EventLoop eventLoop = ctx.channel().eventLoop();
        log.info("---------服务端重新连接-----------" + port);
        Bootstrap bootstrap = nettyClient.initBootstrap(eventLoop);
        nettyClient.doConnect(bootstrap,host,port);
        super.channelInactive(ctx);
    }

//    /**
//     * 客户端注册
//     * @param ctx
//     * @throws Exception
//     */
//    @Override
//    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//        log.info("当前服务注册:" + ctx.channel().id());
//        /*
//        RequestInfo requestInfo = new RequestInfo();
//        requestInfo.setClientGroupId(clientGroupId);
//        requestInfo.setClientId(clientId);
//        requestInfo.setMessage("注册服务...");
//        Attribute attribute = ctx.channel().attr(JEHC_CLOUD_NETTY_CLIENT_CHANNEL);
//        attribute.set(requestInfo);
//        */
//        super.channelRegistered(ctx);
//    }
//
//    /**
//     * 客户端退出
//     * @param ctx
//     * @throws Exception
//     */
//    @Override
//    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
//        super.channelUnregistered(ctx);
//        /*
//        Object requestInfo = ctx.channel().attr(JEHC_CLOUD_NETTY_CLIENT_CHANNEL).get();
//        */
//        log.info("退出服务：" +ctx.channel().id().asLongText());
//    }

    //采用线程池做业务处理

    /**
     * 处理消息
     * @param message
     */
    public void executeTask(String message){
        try {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = SpringUtils.getBean("oauthClientThreadPool",ThreadPoolTaskExecutor.class);

            threadPoolTaskExecutor.execute(new Runnable() {
                public void run() {
                    log.info("接收授权中心服务端消息: {}", message);
                    RequestInfo requestInfo = JsonUtil.fromAliFastJson(message, RequestInfo.class);
                    if(null != requestInfo && null != requestInfo.getData()){
                        TokenAttributesUtil tokenAttributesUtil = SpringUtils.getBean(TokenAttributesUtil.class);
                        String token = requestInfo.getData().toString();
                        Transfer transfer = JsonUtil.fromAliFastJson(token,Transfer.class);
                        if(transfer.getActionType().equals(Constant.TOKEN_DESTORY)){
                            tokenAttributesUtil.remove(transfer.getToken());//直接remove
                        }else{
                            tokenAttributesUtil.put(transfer.getToken(),transfer);
                        }
                    }
                }
            });
        }catch (Exception e){
            log.error("处理消息异常：{}",e);
        }
    }
}
